---
title: Streams
sidebarTitle: Streams
description: Read and consume real-time streaming data from your tasks in your backend
---

The Streams API allows you to read streaming data from your Trigger.dev tasks in your backend code. This is particularly useful for consuming AI/LLM outputs, progress updates, or any other real-time data that your tasks emit.

<Note>
  To learn how to emit streams from your tasks, see our [Realtime Streams](/tasks/streams) documentation. For frontend applications using React, see our [React hooks streams documentation](/realtime/react-hooks/streams).
</Note>

## Reading streams

### Using defined streams (Recommended)

The recommended approach is to use [defined streams](/tasks/streams#defining-typed-streams-recommended) for full type safety:

```ts
import { streams } from "@trigger.dev/sdk";
import { aiStream } from "./trigger/streams";

async function consumeStream(runId: string) {
  // Read from the defined stream
  const stream = await aiStream.read(runId);

  let fullText = "";

  for await (const chunk of stream) {
    console.log("Received chunk:", chunk); // chunk is typed!
    fullText += chunk;
  }

  console.log("Final text:", fullText);
}
```

### Direct stream reading

If you prefer not to use defined streams, you can read directly by specifying the stream key:

```ts
import { streams } from "@trigger.dev/sdk";

async function consumeStream(runId: string) {
  // Read from a stream by key
  const stream = await streams.read<string>(runId, "ai-output");

  for await (const chunk of stream) {
    console.log("Received chunk:", chunk);
  }
}
```

### Reading from the default stream

Every run has a default stream, so you can omit the stream key:

```ts
import { streams } from "@trigger.dev/sdk";

async function consumeDefaultStream(runId: string) {
  // Read from the default stream
  const stream = await streams.read<string>(runId);

  for await (const chunk of stream) {
    console.log("Received chunk:", chunk);
  }
}
```

## Stream options

The `read()` method accepts several options for controlling stream behavior:

### Timeout

Set a timeout to stop reading if no data is received within a specified time:

```ts
import { streams } from "@trigger.dev/sdk";
import { aiStream } from "./trigger/streams";

async function consumeWithTimeout(runId: string) {
  const stream = await aiStream.read(runId, {
    timeoutInSeconds: 120, // Wait up to 2 minutes for data
  });

  try {
    for await (const chunk of stream) {
      console.log("Received chunk:", chunk);
    }
  } catch (error) {
    if (error.name === "TimeoutError") {
      console.log("Stream timed out");
    }
  }
}
```

### Start index

Resume reading from a specific chunk index (useful for reconnection scenarios):

```ts
import { streams } from "@trigger.dev/sdk";
import { aiStream } from "./trigger/streams";

async function resumeStream(runId: string, lastChunkIndex: number) {
  // Start reading from the chunk after the last one we received
  const stream = await aiStream.read(runId, {
    startIndex: lastChunkIndex + 1,
  });

  for await (const chunk of stream) {
    console.log("Received chunk:", chunk);
  }
}
```

### Abort signal

Use an `AbortSignal` to cancel stream reading:

```ts
import { streams } from "@trigger.dev/sdk";
import { aiStream } from "./trigger/streams";

async function consumeWithCancellation(runId: string) {
  const controller = new AbortController();

  // Cancel after 30 seconds
  setTimeout(() => controller.abort(), 30000);

  const stream = await aiStream.read(runId, {
    signal: controller.signal,
  });

  try {
    for await (const chunk of stream) {
      console.log("Received chunk:", chunk);

      // Optionally abort based on content
      if (chunk.includes("STOP")) {
        controller.abort();
      }
    }
  } catch (error) {
    if (error.name === "AbortError") {
      console.log("Stream was cancelled");
    }
  }
}
```

### Combining options

You can combine multiple options:

```ts
import { streams } from "@trigger.dev/sdk";
import { aiStream } from "./trigger/streams";

async function advancedStreamConsumption(runId: string) {
  const controller = new AbortController();

  const stream = await aiStream.read(runId, {
    timeoutInSeconds: 300, // 5 minute timeout
    startIndex: 0, // Start from the beginning
    signal: controller.signal, // Allow cancellation
  });

  try {
    for await (const chunk of stream) {
      console.log("Received chunk:", chunk);
    }
  } catch (error) {
    if (error.name === "AbortError") {
      console.log("Stream was cancelled");
    } else if (error.name === "TimeoutError") {
      console.log("Stream timed out");
    } else {
      console.error("Stream error:", error);
    }
  }
}
```

## Practical examples

### Reading AI streaming responses

Here's a complete example of consuming an AI stream from your backend:

```ts
import { streams } from "@trigger.dev/sdk";
import { aiStream } from "./trigger/streams";

async function consumeAIStream(runId: string) {
  const stream = await aiStream.read(runId, {
    timeoutInSeconds: 300, // AI responses can take time
  });

  let fullResponse = "";
  const chunks: string[] = [];

  for await (const chunk of stream) {
    chunks.push(chunk);
    fullResponse += chunk;

    // Process each chunk as it arrives
    console.log("Chunk received:", chunk);

    // Could send to websocket, SSE, etc.
    // await sendToClient(chunk);
  }

  console.log("Stream complete!");
  console.log("Total chunks:", chunks.length);
  console.log("Full response:", fullResponse);

  return { fullResponse, chunks };
}
```

### Reading multiple streams

If a task emits multiple streams, you can read them concurrently or sequentially:

```ts
import { streams } from "@trigger.dev/sdk";
import { aiStream, progressStream } from "./trigger/streams";

async function consumeMultipleStreams(runId: string) {
  // Read streams concurrently
  const [aiData, progressData] = await Promise.all([
    consumeStream(aiStream, runId),
    consumeStream(progressStream, runId),
  ]);

  return { aiData, progressData };
}

async function consumeStream<T>(
  streamDef: { read: (runId: string) => Promise<AsyncIterableStream<T>> },
  runId: string
): Promise<T[]> {
  const stream = await streamDef.read(runId);
  const chunks: T[] = [];

  for await (const chunk of stream) {
    chunks.push(chunk);
  }

  return chunks;
}
```

### Piping streams to HTTP responses

You can pipe streams directly to HTTP responses for server-sent events (SSE):

```ts
import { streams } from "@trigger.dev/sdk";
import { aiStream } from "./trigger/streams";
import type { NextRequest } from "next/server";

export async function GET(request: NextRequest) {
  const runId = request.nextUrl.searchParams.get("runId");

  if (!runId) {
    return new Response("Missing runId", { status: 400 });
  }

  const stream = await aiStream.read(runId, {
    timeoutInSeconds: 300,
  });

  // Create a readable stream for SSE
  const encoder = new TextEncoder();
  const readableStream = new ReadableStream({
    async start(controller) {
      try {
        for await (const chunk of stream) {
          // Format as SSE
          const data = `data: ${JSON.stringify({ chunk })}\n\n`;
          controller.enqueue(encoder.encode(data));
        }
        controller.close();
      } catch (error) {
        controller.error(error);
      }
    },
  });

  return new Response(readableStream, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      "Connection": "keep-alive",
    },
  });
}
```

### Implementing retry logic

Handle transient errors with retry logic:

```ts
import { streams } from "@trigger.dev/sdk";
import { aiStream } from "./trigger/streams";

async function consumeStreamWithRetry(
  runId: string,
  maxRetries = 3
): Promise<string[]> {
  let lastChunkIndex = 0;
  const allChunks: string[] = [];
  let attempt = 0;

  while (attempt < maxRetries) {
    try {
      const stream = await aiStream.read(runId, {
        startIndex: lastChunkIndex,
        timeoutInSeconds: 120,
      });

      for await (const chunk of stream) {
        allChunks.push(chunk);
        lastChunkIndex++;
      }

      // Success! Break out of retry loop
      break;
    } catch (error) {
      attempt++;

      if (attempt >= maxRetries) {
        throw new Error(`Failed after ${maxRetries} attempts: ${error.message}`);
      }

      console.log(`Retry attempt ${attempt} after error:`, error.message);

      // Wait before retrying (exponential backoff)
      await new Promise((resolve) => setTimeout(resolve, 1000 * Math.pow(2, attempt)));
    }
  }

  return allChunks;
}
```

### Processing streams in chunks

Process streams in batches for efficiency:

```ts
import { streams } from "@trigger.dev/sdk";
import { aiStream } from "./trigger/streams";

async function processStreamInBatches(runId: string, batchSize = 10) {
  const stream = await aiStream.read(runId);

  let batch: string[] = [];

  for await (const chunk of stream) {
    batch.push(chunk);

    if (batch.length >= batchSize) {
      // Process the batch
      await processBatch(batch);
      batch = [];
    }
  }

  // Process remaining chunks
  if (batch.length > 0) {
    await processBatch(batch);
  }
}

async function processBatch(chunks: string[]) {
  console.log(`Processing batch of ${chunks.length} chunks`);
  // Do something with the batch
  // e.g., save to database, send to queue, etc.
}
```

## Using with `runs.subscribeToRun()`

For more advanced use cases where you need both the run status and streams, you can use the `runs.subscribeToRun()` method with `.withStreams()`:

```ts
import { runs } from "@trigger.dev/sdk";
import type { myTask } from "./trigger/myTask";

async function subscribeToRunAndStreams(runId: string) {
  for await (const update of runs.subscribeToRun<typeof myTask>(runId).withStreams()) {
    switch (update.type) {
      case "run":
        console.log("Run update:", update.run.status);
        break;
      case "default":
        console.log("Stream chunk:", update.chunk);
        break;
    }
  }
}
```

<Note>
  For most use cases, we recommend using `streams.read()` with defined streams for better type safety and clearer code. Use `runs.subscribeToRun().withStreams()` only when you need to track both run status and stream data simultaneously.
</Note>
